Global Plots from MOM and POP Output

Imports

from prefect import task, Flow, Parameter
from prefect.executors import DaskExecutor
import intake
from ecgtools import Builder
from ecgtools.parsers.cesm import parse_cesm_timeseries, parse_cesm_history
from ncar_jobqueue import NCARCluster
from distributed import Client
import xcollection
import hvplot
import hvplot.xarray
import cmocean
import cartopy.crs as ccrs
import numpy as np
import xarray as xr
import holoviews as hv
import panel as pn
import calc
hv.extension('bokeh')
/glade/u/home/mgrover/.local/lib/python3.9/site-packages/dask_jobqueue/core.py:19: FutureWarning: format_bytes is deprecated and will be removed in a future release. Please use dask.utils.format_bytes instead.
  from distributed.utils import format_bytes, parse_bytes, tmpfile
/glade/u/home/mgrover/.local/lib/python3.9/site-packages/dask_jobqueue/core.py:19: FutureWarning: parse_bytes is deprecated and will be removed in a future release. Please use dask.utils.parse_bytes instead.
  from distributed.utils import format_bytes, parse_bytes, tmpfile
/glade/u/home/mgrover/.local/lib/python3.9/site-packages/dask_jobqueue/htcondor.py:6: FutureWarning: parse_bytes is deprecated and will be removed in a future release. Please use dask.utils.parse_bytes instead.
  from distributed.utils import parse_bytes

Build the Data Catalogs

MOM Catalog

mom_paths = ['/glade/scratch/mlevy/archive/mom_oob.003/ocn/hist']
mom_builder = Builder(mom_paths, depth=0)

mom_stream_info = {
    'mom6.h_bgc_monthly_z':{'component':'ocn', 'frequency':'month_1'},
    'mom6.h_bgc_monthly':{'component':'ocn', 'frequency':'month_1'},
    'mom6.hm':{'component':'ocn', 'frequency':'month_1'},
    'mom6.static':{'component':'ocn', 'frequency':'month_1'}
    # add hm bgc stream
}

mom_builder.build(parse_cesm_history, parsing_func_kwargs={'user_streams_dict': mom_stream_info})
[Parallel(n_jobs=-1)]: Using backend SequentialBackend with 1 concurrent workers.
[Parallel(n_jobs=-1)]: Done   1 out of   1 | elapsed:    0.0s remaining:    0.0s
[Parallel(n_jobs=-1)]: Done   1 out of   1 | elapsed:    0.0s finished
[Parallel(n_jobs=-1)]: Using backend SequentialBackend with 1 concurrent workers.
/glade/work/mgrover/miniconda3/envs/cesm-collections-dev/lib/python3.9/site-packages/ecgtools/parsers/cesm.py:158: UserWarning: Using the default frequency definitions
  warnings.warn('Using the default frequency definitions')
[Parallel(n_jobs=-1)]: Done   1 out of   1 | elapsed:    2.1s remaining:    0.0s
[Parallel(n_jobs=-1)]: Done   2 out of   2 | elapsed:    2.2s remaining:    0.0s
[Parallel(n_jobs=-1)]: Done   3 out of   3 | elapsed:    2.4s remaining:    0.0s
[Parallel(n_jobs=-1)]: Done   4 out of   4 | elapsed:    2.5s remaining:    0.0s
[Parallel(n_jobs=-1)]: Done 1741 out of 1741 | elapsed:  5.8min finished
/glade/work/mgrover/miniconda3/envs/cesm-collections-dev/lib/python3.9/site-packages/ecgtools/builder.py:191: UserWarning: Unable to parse 760 assets/files. A list of these assets can be found in `.invalid_assets` attribute.
  self.get_directories().get_filelist()._parse(
Builder(root_path=[PosixPath('/glade/scratch/mlevy/archive/mom_oob.003/ocn/hist')], extension='.nc', depth=0, exclude_patterns=None, njobs=-1)
mom_builder.save(
    # File path - could save as .csv (uncompressed csv) or .csv.gz (compressed csv)
    "../data/cesm-validation-mom-pop-vertical-grid.csv",
    # Column name including filepath
    path_column_name='path',
    # Column name including variables
    variable_column_name='variables',
    # Data file format - could be netcdf or zarr (in this case, netcdf)
    data_format="netcdf",
    # Which attributes to groupby when reading in variables using intake-esm
    groupby_attrs=["component", "stream", "case"],
    # Aggregations which are fed into xarray when reading in data using intake
    aggregations=[
        {
            "type": "join_existing",
            "attribute_name": "date",
            "options": {"dim": "time", "coords": "minimal", "compat": "override"},
        }
    ],
)
Saved catalog location: ../data/cesm-validation-mom-pop-vertical-grid.json and ../data/cesm-validation-mom-pop-vertical-grid.csv
/glade/scratch/mgrover/ipykernel_107241/3445297225.py:1: UserWarning: Unable to parse 760 assets/files. A list of these assets can be found in ../data/invalid_assets_cesm-validation-mom-pop-vertical-grid.csv.
  mom_builder.save(
mom_catalog = intake.open_esm_datastore('../data/cesm-validation-mom-pop-vertical-grid.json')

POP catalog

pop_paths = ['/glade/scratch/mlevy/archive/pop_no_mcog/ocn/hist']

pop_builder = Builder(pop_paths, depth=0)

pop_stream_info = {}

pop_builder.build(parse_cesm_history, parsing_func_kwargs={'user_streams_dict': pop_stream_info})
[Parallel(n_jobs=-1)]: Using backend SequentialBackend with 1 concurrent workers.
[Parallel(n_jobs=-1)]: Done   1 out of   1 | elapsed:    0.0s remaining:    0.0s
[Parallel(n_jobs=-1)]: Done   1 out of   1 | elapsed:    0.0s finished
[Parallel(n_jobs=-1)]: Using backend SequentialBackend with 1 concurrent workers.
[Parallel(n_jobs=-1)]: Done   1 out of   1 | elapsed:    0.3s remaining:    0.0s
[Parallel(n_jobs=-1)]: Done   2 out of   2 | elapsed:    0.7s remaining:    0.0s
[Parallel(n_jobs=-1)]: Done   3 out of   3 | elapsed:    1.1s remaining:    0.0s
[Parallel(n_jobs=-1)]: Done   4 out of   4 | elapsed:    1.4s remaining:    0.0s
[Parallel(n_jobs=-1)]: Done 742 out of 742 | elapsed:  5.8min finished
/glade/work/mgrover/miniconda3/envs/cesm-collections-dev/lib/python3.9/site-packages/ecgtools/builder.py:191: UserWarning: Unable to parse 1 assets/files. A list of these assets can be found in `.invalid_assets` attribute.
  self.get_directories().get_filelist()._parse(
Builder(root_path=[PosixPath('/glade/scratch/mlevy/archive/pop_no_mcog/ocn/hist')], extension='.nc', depth=0, exclude_patterns=None, njobs=-1)
pop_builder.save(
    # File path - could save as .csv (uncompressed csv) or .csv.gz (compressed csv)
    "../data/cesm-validation-pop.csv",
    # Column name including filepath
    path_column_name='path',
    # Column name including variables
    variable_column_name='variables',
    # Data file format - could be netcdf or zarr (in this case, netcdf)
    data_format="netcdf",
    # Which attributes to groupby when reading in variables using intake-esm
    groupby_attrs=["component", "stream", "case"],
    # Aggregations which are fed into xarray when reading in data using intake
    aggregations=[
        {
            "type": "join_existing",
            "attribute_name": "date",
            "options": {"dim": "time", "coords": "minimal", "compat": "override"},
        }
    ],
)
Saved catalog location: ../data/cesm-validation-pop.json and ../data/cesm-validation-pop.csv
/glade/scratch/mgrover/ipykernel_107241/3824853795.py:1: UserWarning: Unable to parse 1 assets/files. A list of these assets can be found in ../data/invalid_assets_cesm-validation-pop.csv.
  pop_builder.save(

Generic Catalog Building Prefect

@task
def build_catalog(data_directory_paths, output_catalog_path, file_type, stream_info, depth=0):
    
    # Setup the Builder
    catalog_builder = Builder(data_directory_paths, depth=depth)
    
    if file_type == 'timeseries':
        parser = parse_cesm_timeseries
    
    elif file_type == 'history':
        parser = parse_cesm_history
    
    # Build the Catalog
    catalog_builder.build(parser, parsing_func_kwargs={'user_streams_dict': stream_info})

Read in the Data

Spin up a Cluster

cluster = NCARCluster()
cluster.scale(10)
client = Client(cluster)
client

Client

Client-37153701-376e-11ec-9850-3cecef1b11e4

Connection method: Cluster object Cluster type: dask_jobqueue.PBSCluster
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/mgrover/proxy/8787/status

Cluster Info

def temporal_average(ds):
    return ds.mean(dim='time')

def yearly_average(ds, model):
    if model == 'pop':
        ds = center_time(ds)
    return geocat.comp.climatologies.climatology(ds, "year", "time")

def global_average(ds, configuration_dict):
    horizontal_dims = configuration_dict['horizontal_dims']
    area_field = configuration_dict['area_field']
    land_sea_mask = configuration_dict['land_sea_mask']
    return calc.global_mean(ds, horizontal_dims, area_field, land_sea_mask, normalize=True)

def global_integral(ds, configuration_dict):
    horizontal_dims = configuration_dict['horizontal_dims']
    area_field = configuration_dict['area_field']
    land_sea_mask = configuration_dict['land_sea_mask']
    return calc.global_mean(ds, horizontal_dims, area_field, land_sea_mask, normalize=False)
    

def add_cyclic_point(ds):
    """
    Add a cyclic point to POP model output dataset
    Parameters
    ----------
    ds : xarray.Dataset
        POP output dataset
    Returns
    -------
    dso : xarray.Dataset
        modified POP model output dataset with cyclic point added
    """
    ni = ds.TLONG.shape[1]

    xL = int(ni / 2 - 1)
    xR = int(xL + ni)

    tlon = ds.TLONG.data
    tlat = ds.TLAT.data

    tlon = np.where(np.greater_equal(tlon, min(tlon[:, 0])), tlon - 360.0, tlon)
    lon = np.concatenate((tlon, tlon + 360.0), 1)
    lon = lon[:, xL:xR]

    if ni == 320:
        lon[367:-3, 0] = lon[367:-3, 0] + 360.0
    lon = lon - 360.0

    lon = np.hstack((lon, lon[:, 0:1] + 360.0))
    if ni == 320:
        lon[367:, -1] = lon[367:, -1] - 360.0

    # Trick cartopy into doing the right thing:
    # Cartopy gets confused when the cyclic coords are identical
    lon[:, 0] = lon[:, 0] - 1e-8

    # Periodicity
    lat = np.concatenate((tlat, tlat), 1)
    lat = lat[:, xL:xR]
    lat = np.hstack((lat, lat[:, 0:1]))

    TLAT = xr.DataArray(lat, dims=('nlat', 'nlon'))
    TLONG = xr.DataArray(lon, dims=('nlat', 'nlon'))

    dso = xr.Dataset({'TLAT': TLAT, 'TLONG': TLONG})

    # Copy vars
    varlist = [v for v in ds.data_vars if v not in ['TLAT', 'TLONG']]
    for v in varlist:
        v_dims = ds[v].dims
        if not ('nlat' in v_dims and 'nlon' in v_dims):
            dso[v] = ds[v]
        else:
            # Determine and sort other dimensions
            other_dims = set(v_dims) - {'nlat', 'nlon'}
            other_dims = tuple([d for d in v_dims if d in other_dims])
            lon_dim = ds[v].dims.index('nlon')
            field = ds[v].data
            field = np.concatenate((field, field), lon_dim)
            field = field[..., :, xL:xR]
            field = np.concatenate((field, field[..., :, 0:1]), lon_dim)
            dso[v] = xr.DataArray(field, dims=other_dims + ('nlat', 'nlon'), attrs=ds[v].attrs)

    # Copy coords
    for v, da in ds.coords.items():
        if not ('nlat' in da.dims and 'nlon' in da.dims):
            dso = dso.assign_coords(**{v: da})
    return dso
def grab_static_data(catalog):
    return xr.open_dataset(catalog.search(stream='mom6.static').df.path.values[0])
@task
def read_catalog(path, csv_kwargs):
    return intake.open_esm_datastore(path, csv_kwargs=csv_kwargs)

@task
def subset_catalog(catalog, search_dict):
    return catalog.search(**search_dict)

@task
def subset_dates(catalog, date_subset):
    return catalog.search(date=catalog.df.date[date_subset].values)

@task
def convert_to_collection(dsets):
    return xcollection.Collection(dsets)

@task
def load_data(catalog, cdf_kwargs):
    return catalog.to_dataset_dict(cdf_kwargs=cdf_kwargs)

@task
def long_term_mean(collection):
    return collection.map(temporal_average)

@task
def annual_mean(collection, model):
    return collection.map(yearly_average, model)

@task
def global_mean(collection, configuration_dict):
    return collection.map(global_average, configuration_dict)

@task
def global_integral(collection, configuration_dict):
    return collection.map(global_integral, configuration_dict)

@task
def fix_grid(collection):
    return collection.map(temporal_average)
test = {'mom':{'integral_variables':[{'FG_CO2':{'out_units':{'Tg/year'}}}],
               'mean_variables':['TEMP':{'out_units':'degC'}],
               'horizontal_dims':('yh', 'xh'),
               'area_field':'area_t',
               'land_sea_mask':'wet'}}
  File "/glade/scratch/mgrover/ipykernel_198907/1801437483.py", line 2
    'mean_variables':['TEMP':{'out_units':'degC'}],
                            ^
SyntaxError: invalid syntax
import ast
with Flow('ocean-diagnostics') as flow:
    
    # Setup our parameters
    path = Parameter('path', default='../data/cesm-validation-pop.json')
    csv_kwargs = Parameter('csv_kwargs', default={"converters": {"variables": ast.literal_eval}})
    search_dict = Parameter('search_dict', default={})
    subset_times = Parameter('subset_times', default=slice(-48, -1))
    cdf_kwargs = Parameter('cdf_kwargs', default={'chunks':{}})
    model = Parameter('model', default='pop')
    configuration_dict = Parameter('configuration_dict', default={})
    
    # Setup our functions/tasks
    cat = read_catalog(path, csv_kwargs)
    
    # Subset the catalog
    cat_subset = subset_catalog(cat, search_dict)
    
    # Add the date subset
    cat_subset = subset_dates(cat_subset, subset_times)
    
    # Call to dataset dict
    dsets = load_data(cat_subset, cdf_kwargs=cdf_kwargs)
    
    # Convert to an xcollection
    collection = convert_to_collection(dsets)
    
    # Calculate long-term mean
    long_term_average = long_term_mean(collection)
    
    # Calculate the annual mean
    #annual_average = annual_mean(collection, model)
    
    # Calculate the global mean and global integral
    #mean_globe = global_mean(annual_average, configuration_dict)
    #integral_globe = global_integral(annual_average, configuration_dict)
/glade/work/mgrover/miniconda3/envs/cesm-collections-dev/lib/python3.9/contextlib.py:124: UserWarning: Tasks were created but not added to the flow: {<Parameter: model>, <Parameter: configuration_dict>}. This can occur when `Task` classes, including `Parameters`, are instantiated inside a `with flow:` block but not added to the flow either explicitly or as the input to another task. For more information, see https://docs.prefect.io/core/advanced_tutorials/task-guide.html#adding-tasks-to-flows.
  next(self.gen)
flow.visualize()
_images/prefect_workflow_averages_20_0.svg

Execute the Workflow

executor = DaskExecutor(address=client.scheduler.address)
variables = ['O2', 'PO4', 'SiO3', 'NO3']
pop_result = flow.run(path = '../data/cesm-validation-pop.json',
                      search_dict={'stream':'pop.h',
                                   'frequency':'month_1',
                                   'variables':variables},
                      executor=executor)
[2021-10-27 16:11:36-0600] INFO - prefect.FlowRunner | Beginning Flow run for 'ocean-diagnostics'
[2021-10-27 16:11:36-0600] INFO - prefect.DaskExecutor | Connecting to an existing Dask cluster at tcp://10.12.206.57:39346
/glade/work/mgrover/miniconda3/envs/cesm-collections-dev/lib/python3.9/site-packages/distributed/scheduler.py:5490: UserWarning: Scheduler already contains a plugin with name worker-status; overwriting.
  warnings.warn(
[2021-10-27 16:12:03-0600] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
list(pop_result.result.keys())
[<Task: read_catalog>,
 <Task: subset_catalog>,
 <Task: subset_dates>,
 <Parameter: subset_times>,
 <Parameter: cdf_kwargs>,
 <Task: convert_to_collection>,
 <Parameter: path>,
 <Parameter: csv_kwargs>,
 <Parameter: search_dict>,
 <Task: load_data>,
 <Task: long_term_mean>]
pop_result.result[long_term_average]
<Success: "Task run succeeded.">
pop_result.result[-1]
---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
/glade/scratch/mgrover/ipykernel_301136/166169208.py in <module>
----> 1 pop_result.result[-1]

KeyError: -1
flow.get_tasks()
[<Parameter: path>,
 <Parameter: search_dict>,
 <Task: subset_dates>,
 <Task: convert_to_collection>,
 <Task: long_term_mean>,
 <Task: annual_mean>,
 <Parameter: configuration_dict>,
 <Task: read_catalog>,
 <Parameter: csv_kwargs>,
 <Task: subset_catalog>,
 <Parameter: subset_times>,
 <Task: load_data>,
 <Parameter: cdf_kwargs>,
 <Parameter: model>,
 <Task: global_mean>,
 <Task: global_integral>]
output_task = flow.get_tasks()[-5]
output_task
<Task: long_term_mean>
mom_result = flow.run(path = '../data/cesm-validation-mom-pop-vertical-grid.json',
                      search_dict={'stream':'mom6.h_bgc_monthly_z',
                                   'variables':variables},
                      executor=executor)
[2021-10-27 11:28:52-0600] INFO - prefect.FlowRunner | Beginning Flow run for 'ocean-diagnostics'
INFO:prefect.FlowRunner:Beginning Flow run for 'ocean-diagnostics'
[2021-10-27 11:28:52-0600] INFO - prefect.DaskExecutor | Connecting to an existing Dask cluster at tcp://10.12.206.39:33277
INFO:prefect.DaskExecutor:Connecting to an existing Dask cluster at tcp://10.12.206.39:33277
/glade/work/mgrover/miniconda3/envs/cesm-collections-dev/lib/python3.9/site-packages/distributed/scheduler.py:5490: UserWarning: Scheduler already contains a plugin with name worker-status; overwriting.
  warnings.warn(
[2021-10-27 11:28:59-0600] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
INFO:prefect.FlowRunner:Flow run SUCCESS: all reference tasks succeeded
mom_catalog = intake.open_esm_datastore('../data/cesm-validation-mom-pop-vertical-grid.json')
mom_output = mom_result.result[output_task]._result.value
mom_ds = mom_output['ocn.mom6.h_bgc_monthly_z.mom_oob.003']
static_ds = grab_static_data(mom_catalog)
mom_with_coords = xr.merge([mom_ds, static_ds]).set_coords(['geolon', 'geolat'])[variables].compute().rename({'geolon':'lon',
                                                                                                              'geolat':'lat'})
collection = pop_result.result[output_task]._result.value
ds = collection['ocn.pop.h.pop_no_mcog'][variables].drop_vars(['ULAT', 'ULONG']).compute()
dso = add_cyclic_point(ds).set_coords(['TLAT', 'TLONG']).rename({'TLAT':'lat',
                                                                 'TLONG':'lon'})
mom_with_coords['lon'].attrs = dso.lon.attrs
mom_with_coords['lat'].attrs = dso.lat.attrs
dso = dso.rename({'z_t':'z_pop_l'})
dso['z_pop_l'] = mom_with_coords.z_pop_l
mom_plot = mom_with_coords.hvplot.quadmesh(x='lon',
                                           y='lat',
                                           z=variables,
                                           cmap='inferno',
                                           rasterize=True,
                                           coastline=True,
                                           projection=ccrs.PlateCarree(),
                                           title='MOM Case: mom_oob.003',
                                           project=True
                                          )
pop_plot = dso.hvplot.quadmesh(x='lon',
                               y='lat',
                               z=variables,
                               geo=True,
                               rasterize=True, 
                               coastline=True, 
                               cmap='inferno', 
                               projection=ccrs.PlateCarree(),
                               title='POP Case: pop_no_mcog',
                               project=True
                              )
hv.Layout([mom_plot, pop_plot]).opts(shared_axes=True).cols(1)